Browse Source

增加知识产权处理逻辑

xufei 4 years ago
parent
commit
072d10626b

+ 0 - 89
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCopyrightReg.scala

@@ -1,89 +0,0 @@
-package com.winhc.bigdata.spark.jobs
-
-import java.util.Date
-
-import com.winhc.bigdata.spark.utils.SparkUtils
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.mutable
-
-/**
- * 软件著作权
- */
-object CompanyCopyrightReg {
-  def main(args: Array[String]): Unit = {
-    if (args.length != 3) {
-      println("请配置计算资源: instances, cores, memory .")
-      System.exit(-1)
-    }
-
-    var config = mutable.Map.empty[String, String]
-    val Array(instances, cores, memory) = args;
-
-    println(
-      s"""
-         |instances : $instances,
-         |cores : $cores,
-         |memory : $memory
-         |""".stripMargin)
-
-    config = mutable.Map("spark.executor.instances" -> instances,
-      "spark.executor.cores" -> cores,
-      "spark.executor.memory" -> memory
-    )
-
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-
-    import spark.implicits._
-    import spark._
-    import org.apache.spark.sql.functions._
-    println("CompanyCopyrightReg calc start! " + new Date().toString)
-
-    val sourceTable = "ods_company_copyright_reg_mysql"
-    val resultTable = "ads_company_copyright_reg"
-    val ds = "20200513"
-
-    sql(s"select * from $sourceTable where ds = $ds")
-      .dropDuplicates("reg_num", "full_name")
-      .createOrReplaceTempView("t1")
-    sql(s"CACHE TABLE t1")
-
-    //拆平新表
-    sql(
-      s"""
-         |SELECT  c.*
-         |        ,coalesce(d.res_cid,c.cid) as res_cid
-         |FROM    (
-         |            SELECT  *
-         |                    ,cid
-         |            FROM    t1 a
-         |            LATERAL VIEW explode(split(cids, ';')) b AS cid
-         |            WHERE   a.ds = $ds
-         |        ) c
-         |LEFT JOIN company_name_mapping d
-         |ON      c.cid = d.cid
-         |""".stripMargin).dropDuplicates("reg_num", "full_name","res_cid")
-      .createOrReplaceTempView(s"t2")
-
-    //聚合新cids
-    sql(
-      """
-        |SELECT
-        |t1.id ,cids ,reg_num,cat_num,full_name,simple_name,version,author_nationality
-        |,publish_time,reg_time,source_url,create_time,update_time,deleted,x.new_cids
-        |FROM    t1
-        |LEFT JOIN (
-        |              SELECT  id
-        |                      ,concat_ws(';',collect_set(res_cid)) new_cids
-        |              FROM    t2
-        |              GROUP BY id
-        |          ) x
-        |ON      t1.id = x.id
-        |""".stripMargin)
-      .write.mode("overwrite").insertInto(resultTable)
-
-    println("CompanyCopyrightReg calc stop! " + new Date().toString)
-
-    spark.stop()
-  }
-}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.utils.{EsUtils, SparkUtils}
-import org.datanucleus.util.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 /**
  * @Author: XuJiakai

+ 111 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIntellectuals.scala

@@ -0,0 +1,111 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.util.Date
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * 软件著作权 | 作品著作权 | 专利
+ * π
+ */
+
+object CompanyIntellectuals {
+  val tabMapping: Map[String, Seq[String]] =
+    Map("ods_company_copyright_reg" -> Seq("reg_num", "full_name"), //软件著作权
+    "ods_company_copyright_works" -> Seq("reg_num", "name"), //作品著作权
+    "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
+    )
+
+  def main(args: Array[String]): Unit = {
+    val (sourceTable, cols) = valid(args)
+
+    var config = mutable.Map.empty[String, String]
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    new CompanyIntellectuals(spark, sourceTable, tabMapping, cols).calc()
+    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
+    spark.stop()
+  }
+
+  def valid(args: Array[String]) = {
+    if (args.length != 1) {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+
+    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
+    if (cols.isEmpty) {
+      println("输入表不存在,请配置计算规则!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, cols)
+  }
+}
+
+case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
+                                tabMapping: Map[String, Seq[String]], cols: Seq[String])
+  extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+  import spark._
+  import org.apache.spark.sql.functions._
+
+  def calc(): Unit = {
+    val odsTable = s"ods_$sourceTable"
+    val adsListTable = s"ads_${sourceTable}_list"
+    val adsTable = s"ads_$sourceTable"
+    val companyMapping = "company_name_mapping"
+    val ds = BaseUtil.getPartion(odsTable, spark)
+    //table字段
+    val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
+    val disCol = tabMapping.get(odsTable).get
+
+    sql(s"select * from $odsTable where ds = $ds and cids is not null")
+      .dropDuplicates(disCol)
+      .createOrReplaceTempView("t1")
+
+    sql(s"CACHE TABLE t1")
+
+    //拆平新表
+    sql(
+      s"""
+         |SELECT  c.*
+         |        ,coalesce(d.res_cid,c.cid) as new_cid
+         |FROM    (
+         |            SELECT  *
+         |                    ,cid
+         |            FROM    t1 a
+         |            LATERAL VIEW explode(split(cids, ';')) b AS cid
+         |        ) c
+         |LEFT JOIN $companyMapping d
+         |ON      c.cid = d.cid
+         |""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
+      .createOrReplaceTempView(s"t2")
+
+    sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
+
+    //聚合新cids
+    val df1 = sql(
+      s"""
+         |SELECT
+         |${columns.mkString(",")},x.new_cids
+         |FROM    t1
+         |LEFT JOIN (
+         |              SELECT  id as new_id
+         |                      ,concat_ws(';',collect_set(new_cid)) new_cids
+         |              FROM    t2
+         |              GROUP BY id
+         |          ) x
+         |ON      t1.id = x.new_id
+         |""".stripMargin)
+
+    df1.createOrReplaceTempView("t3")
+    //写表
+    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
+    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cids from t3")
+  }
+}

+ 213 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIntellectualsScore.scala

@@ -0,0 +1,213 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.util.Date
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * 软件著作权 | 作品著作权 | 专利
+ * π
+ */
+
+object CompanyIntellectualsScore {
+
+  val tabMapping: Map[String, (String, String, String, String)] =
+    Map("ads_company_copyright_reg_list" -> ("1", "publish_time", "资产权益", "软著作权"), //软件著作权
+      "ads_company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
+      "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利") //专利
+    )
+
+  def main(args: Array[String]): Unit = {
+
+    val (sourceTable, flag, time, kind, project) = valid(args)
+
+    var config = mutable.Map.empty[String, String]
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    //专利分成两部分
+    if (flag.contains(";")) {
+      flag.split(";").foreach(f => {
+        new CompanyIntellectualsScore(spark, sourceTable, f, time, kind, project).calc()
+      })
+    } else {
+      new CompanyIntellectualsScore(spark, sourceTable, flag, time, kind, project).calc()
+    }
+
+    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
+    spark.stop()
+
+  }
+
+  def valid(args: Array[String]) = {
+    if (args.length != 1) {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
+    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
+      println("输入表不存在!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, flag, time, kind, project)
+  }
+}
+
+case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
+                                     flag: String, time: String, kind: String, project: String
+                                    ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    var appsql = ""
+    var apptab = ""
+    if ("3".equals(flag)) { //实用新型、外观设计专利
+      appsql = "AND substring(pub_number, 7,1) in ('2','3')"
+      apptab = "_s"
+    } else if ("4".equals(flag)) { //发明专利
+      appsql = "AND substring(pub_number, 7,1) not in ('2','3')"
+      apptab = "_f"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $sourceTable
+         |        WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}'
+         |        AND     new_cid IS NOT NULL ${appsql}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+
+    logger.info(
+      s"""
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+         |${showString(sql(s"select * from ${sourceTable}_tmp_view"))}
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+       """.stripMargin)
+
+    sql(s"insert overwrite table ${sourceTable}${apptab}_score  select * from ${sourceTable}_tmp_view")
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    flag match {
+      case "1" => getSoftCopyRightScore(id, cid, cnt1, kind, prpject)
+      case "2" => getCopyRightScore(id, cid, cnt1, kind, prpject)
+      case "3" => ordinaryPatentScore(id, cid, cnt1, kind, prpject)
+      case "4" => inventScore(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //软著作权
+  def getSoftCopyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 3f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 > 50) {
+      score = 8f
+      ty = "软件著作权数量>50,该项分数上加5分"
+    } else if (cnt1 > 5) {
+      score = 3f
+      ty = "授予数量>5"
+    } else if (cnt1 > 0) {
+      score = 2f
+      ty = "授予数量≤5"
+    } else {
+      score = 1f
+      ty = "无"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //著作权
+  def getCopyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 2f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 > 3) {
+      score = 2f
+      ty = "授予数量>3"
+    } else if (cnt1 == 0) {
+      score = 0f
+      ty = "无"
+    } else {
+      score = 1f
+      ty = "授予数量≤3"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //普通专利
+  def ordinaryPatentScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    val project_s = project.split(";")(0)
+    var score = 0f
+    val total = 3f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 1f
+      ty = "无"
+    } else if (cnt1 <= 5) {
+      score = 2f
+      ty = "授予数量≤5"
+    } else {
+      score = 3f
+      ty = "授予数量>5"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project_s, DimScoreV2.newsEventMap.get(project_s), ty,
+      score, total, extraScore)
+  }
+
+  //发明专利
+  def inventScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    val project_s = project.split(";")(1)
+    var score = 0f
+    val total = 7f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 3f
+      ty = "无"
+    } else if (cnt1 <= 3) {
+      score = 5f
+      ty = "授予数量≤3"
+    } else if (cnt1 <= 50) {
+      score = 7f
+      ty = "授予数量>3"
+    } else {
+      score = 12f
+      ty = "发明专利数量>50,该项分数上加5分"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project_s, DimScoreV2.newsEventMap.get(project_s), ty,
+      score, total, extraScore)
+  }
+
+}