浏览代码

Merge remote-tracking branch 'origin/master'

许家凯 4 年之前
父节点
当前提交
ce51cfd574

+ 1 - 1
src/main/resources/env.yaml

@@ -1,5 +1,5 @@
 profile:
-  activate: dev
+  activate: prod
 
 ---
 env:

+ 192 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -0,0 +1,192 @@
+package com.winhc.bigdata.spark.jobs.message
+
+import com.winhc.bigdata.spark.udf.MapAggs
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Json
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+ * @Description: 知识产权动态变化
+ * @author π
+ * @date 2020/7/2719:15
+ */
+object IntellectualMessage {
+  def main(args: Array[String]): Unit = {
+
+    val project = "winhc_eci_dev"
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
+    //    IntellectualMessage(spark, project, "").calc()
+    IntellectualMessage(spark, project, "").transForm("20200727", "tmp_xf_ads_intellectual_message")
+    spark.stop()
+  }
+
+}
+
+
+case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def col2Map(pkg: String, day: String): Map[String, String] = {
+    Map(pkg -> day)
+  }
+
+  def calc(): Unit = {
+    spark.udf.register("col2Map", col2Map _)
+    spark.udf.register("MapAggs", new MapAggs())
+
+    val t1 = s"company_tm" //商标
+    val t2 = s"company_patent_list" //专利
+    val t3 = s"company_icp" //网站
+    val t4 = s"company_copyright_reg_list" //著作权
+    val t5 = s"company_certificate" //证书
+    val t6 = s"company_copyright_works_list" //软著作权
+
+    val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
+
+    val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $res_tb PARTITION(ds=$ds)
+         |select new_cid,date,m,
+         |max(coalesce(split(m[1], ',')[2],'0')) over (partition by new_cid) cnt1,
+         |max(coalesce(split(m[2], ',')[2],'0')) over (partition by new_cid) cnt2,
+         |max(coalesce(split(m[3], ',')[2],'0')) over (partition by new_cid) cnt3,
+         |max(coalesce(split(m[4], ',')[2],'0')) over (partition by new_cid) cnt4,
+         |max(coalesce(split(m[5], ',')[2],'0')) over (partition by new_cid) cnt5,
+         |max(coalesce(split(m[6], ',')[2],'0')) over (partition by new_cid) cnt6
+         |from (
+         |select new_cid,date,MapAggs(m1) m from (
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t1", "app_date", "1")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t2", "pub_date", "2")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t3", "examine_date", "3")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t4", "reg_time", "4")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t5", "start_date", "5")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t6", "reg_time", "6")}
+         |)a
+         |group by new_cid,date
+         |)h
+         |""".stripMargin)
+
+    transForm(ds, res_tb)
+
+  }
+
+  def transForm(ds: String, tb: String): Unit = {
+    sql(
+      s"""
+         |select *
+         |from ${project}.${tb}
+         |where ds = $ds
+         |""".stripMargin).map(r => {
+      val cid = r.getAs[String]("cid")
+      val date = r.getAs[String]("date")
+      val m = r.getAs[Map[String, String]](2)
+      val tags = descStr(m)
+      val content: String = Json(DefaultFormats).write(
+        mutable.LinkedHashMap("商标数量" -> r.getAs[String]("cnt1"),
+          "专利数量" -> r.getAs[String]("cnt2"),
+          "域名数量" -> r.getAs[String]("cnt3"),
+          "著作权数量" -> r.getAs[String]("cnt4"),
+          "证书数量" -> r.getAs[String]("cnt5"),
+          "软著数量" -> r.getAs[String]("cnt6")
+        )
+      )
+      (cid, date, tags, content)
+    }).toDF("cid", "date", "tags", "content").createOrReplaceTempView("res")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${tb}_res PARTITION (ds=$ds)
+         |select * from res
+         |""".stripMargin)
+  }
+
+  def descStr(m: Map[String, String]) = {
+    val list = ListBuffer[String]()
+    if (m.contains("1")) getStr(m("1"), "商标", list)
+    if (m.contains("2")) getStr(m("2"), "专利", list)
+    if (m.contains("3")) getStr(m("3"), "域名", list)
+    if (m.contains("4")) getStr(m("4"), "著作权", list)
+    if (m.contains("5")) getStr(m("5"), "证书", list)
+    if (m.contains("6")) getStr(m("6"), "软著", list)
+    list.toList.mkString(",")
+  }
+
+  def getStr(s: String, name: String, list: ListBuffer[String]) = {
+    val Array(inc_cnt, del_cnt, total_cnt) = s.split(",")
+    if (!inc_cnt.equals("0")) {
+      list += s"新增${name}数量${inc_cnt}个"
+    }
+    if (!del_cnt.equals("0")) {
+      list += s"移除${name}数量${del_cnt}个"
+    }
+  }
+
+  //存量历史计算
+  def assemblySQL(project: String, table: String, date: String, tp: String) = {
+    s"""
+       |(
+       |SELECT  new_cid
+       |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
+       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
+       |        ,sum(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) del_cnt
+       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
+       |        ,$tp type
+       |FROM    (
+       |         select * from (
+       |            SELECT  rowkey
+       |                    ,new_cid
+       |                    ,$date
+       |                    ,deleted
+       |            FROM    $project.inc_ads_$table x
+       |            WHERE   x.ds > 0
+       |            union all
+       |            SELECT  rowkey
+       |                    ,new_cid
+       |                    ,$date
+       |                    ,deleted
+       |            FROM    $project.ads_$table y
+       |            WHERE   y.ds > 0
+       |         )z
+       |            GROUP BY rowkey
+       |                     ,new_cid
+       |                     ,$date
+       |                     ,deleted
+       |        ) b
+       |)
+       |""".stripMargin
+  }
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/model/CompanyEmploymentScore.scala

@@ -86,7 +86,7 @@ case class CompanyEmploymentScore(s: SparkSession, sourceTable: String, tableVie
     val cnt1 = r.getAs[Long]("cnt1")
     val cnt2 = r.getAs[Long]("cnt2")
     flag match {
-      case "302" => employmentScore(id, cid, cnt1,cnt2, kind, prpject)
+      case "208" => employmentScore(id, cid, cnt1,cnt2, kind, prpject)
     }
   }
 
@@ -119,7 +119,7 @@ object CompanyEmploymentScore {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyEmploymentScore(spark, "company_employment","", "302", "start_date", "经营情况", "招聘", "0", "winhc_eci_dev").calc()
+    CompanyEmploymentScore(spark, "company_employment","", "208", "start_date", "经营情况", "招聘", "0", "winhc_eci_dev").calc()
     spark.stop()
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -79,7 +79,7 @@ object CompanyIntellectualsScore {
       return
     }
     //招聘
-    if (flag.equals("302")) {
+    if (flag.equals("208")) {
       new CompanyEmploymentScore(spark, sourceTable, tableView, flag, time, kind, project, "1", namespace).calc()
       return
     }

+ 48 - 0
src/main/scala/com/winhc/bigdata/spark/udf/MapAggs.scala

@@ -0,0 +1,48 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
+import scala.collection.mutable
+
+/**
+ * Create by π
+ * Desc: 聚合map
+ */
+
+class MapAggs(max: Int = 500) extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(
+    Array[StructField](StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)))
+  )
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)))
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+  }
+
+  override def deterministic: Boolean = true
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val map1 = buffer.getMap[String, String](0).toMap
+    if (map1.size >= max) {
+      return
+    }
+    val map2 = input.getMap[String, String](0).toMap
+    val map_new: mutable.Map[String, String] = (scala.collection.mutable.Map[String, String](map1.toSeq: _*)
+      ++ scala.collection.mutable.Map[String, String](map2.toSeq: _*))
+    buffer.update(0, map_new)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    update(buffer1, buffer2)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    buffer.getAs[Map[String, String]](0)
+  }
+}

+ 8 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -51,6 +51,13 @@ case class CompanyIncrForCidUtils(s: SparkSession,
 
     val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
 
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
     println(
       s"""
          |cols_md5:$cols_md5
@@ -61,12 +68,6 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
-    //增量ods和增量ads最后一个分区相等,跳出
-    if (lastDsIncOds.equals(lastDsIncAds)) {
-      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      sys.exit(-1)
-    }
-
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
       !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
@@ -113,7 +114,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                                    SELECT  new_cid AS cid
          |                                            ,${columns.mkString(",")}
          |                                    FROM    ${inc_ads_company_tb}
-         |                                    WHERE   ds >= ${runDs}
+         |                                    WHERE   ds > ${remainDs}
          |                                ) b
          |                        ON      a.cid = b.cid
          |                        UNION ALL

+ 8 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -59,6 +59,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
     val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
 
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
     println(
       s"""
          |cols_md5:$cols_md5
@@ -69,12 +76,6 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
-    //增量ods和增量ads最后一个分区相等,跳出
-    if (lastDsIncOds.equals(lastDsIncAds)) {
-      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      sys.exit(-1)
-    }
-
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
       !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
@@ -143,7 +144,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                                                SELECT  new_cid AS cid
          |                                                        ,${sublistTableFieldName.mkString(",")}
          |                                                FROM    ${inc_ads_company_tb_list}
-         |                                                WHERE   ds >= ${runDs}
+         |                                                WHERE   ds > ${remainDs}
          |                                                UNION ALL
          |                                                SELECT  new_cid AS cid
          |                                                        ,${sublistTableFieldName.mkString(",")}