Browse Source

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

许家凯 4 years ago
parent
commit
e57fb79980

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/etl/dwd_company_bid_ods.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.etl
 
-import com.winhc.bigdata.spark.jobs.CompanyInfoCalculator.prepare
+import com.winhc.bigdata.spark.model.CompanyInfoCalculator.prepare
 import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.spark.sql.SparkSession
 

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyForCidUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * 专利
+ * π
+ */
+
+object CompanyForCid {
+  val tabMapping: Map[String, Seq[String]] =
+    Map("ods_company_icp" -> Seq("web_site", "new_cid"), //网站
+      "ods_company_tm" -> Seq("reg_no", "new_cid"), //商标
+      "ods_company_wechat" -> Seq("public_num", "new_cid"), //微信公众号
+      "ods_company_app_info" -> Seq("name", "new_cid") //产品信息
+    )
+
+  def main(args: Array[String]): Unit = {
+    val (sourceTable, cols) = valid(args)
+    //    var config = mutable.Map.empty[String, String]
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, null)
+    CompanyForCidUtils(spark, sourceTable, cols).calc()
+    spark.stop()
+  }
+
+  def valid(args: Array[String]): (String, Seq[String]) = {
+    println(args.toSeq.mkString(" "))
+    if (args.length == 1) {
+
+    } else if (args.length == 2) {
+      val Array(sourceTable, cols) = args
+      return (sourceTable, cols.split(";").toSeq)
+    } else {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+
+    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
+    if (cols.isEmpty) {
+      println("输入表不存在,请配置计算规则!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, cols)
+  }
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyForCidsUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * 软件著作权 | 作品著作权 | 专利
+ * π
+ */
+
+object CompanyForCids {
+  val tabMapping: Map[String, Seq[String]] =
+    Map("ods_company_copyright_reg" -> Seq("reg_num", "full_name"), //软件著作权
+      "ods_company_copyright_works" -> Seq("reg_num", "name"), //作品著作权
+      "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
+    )
+
+  def main(args: Array[String]): Unit = {
+    val (sourceTable, cols) = valid(args)
+    //    var config = mutable.Map.empty[String, String]
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, null)
+    CompanyForCidsUtils(spark, sourceTable, cols).calc()
+    spark.stop()
+  }
+
+  def valid(args: Array[String]): (String, Seq[String]) = {
+    println(args.toSeq.mkString(" "))
+    if (args.length == 1) {
+
+    } else if (args.length == 2) {
+      val Array(sourceTable, cols) = args
+      return (sourceTable, cols.split(";").toSeq)
+    } else {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+
+    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
+    if (cols.isEmpty) {
+      println("输入表不存在,请配置计算规则!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, cols)
+  }
+}

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -1,11 +1,10 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.model
 
 import java.util.Date
 
+import com.winhc.bigdata.calc.DimScoreV2
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.{Row, SparkSession}
-import com.winhc.bigdata.calc.DimScoreV2
-import com.winhc.bigdata.spark.jobs.CompanyCourtAnnouncement.tabMapping
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -58,8 +57,6 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
-  import spark._
-  import org.apache.spark.sql.functions._
 
   def calc(): Unit = {
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculator.scala

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.model
 
 import com.winhc.bigdata.spark.udf.CompanyMapping
 import com.winhc.bigdata.spark.utils.SparkUtils

+ 5 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculatorV2.scala

@@ -1,13 +1,14 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.model
 
-import java.util
 import java.util.Date
-import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
+
+import com.winhc.bigdata.calc.DimScoreV2
 import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.logging.LogFactory
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Row, SparkSession}
+
 import scala.collection.mutable
 
 object CompanyInfoCalculatorV2 {
@@ -38,8 +39,8 @@ object CompanyInfoCalculatorV2 {
 
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
-    import spark.implicits._
     import spark._
+    import spark.implicits._
 
     println("company calc start! " + new Date().toString)
 

+ 60 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIntellectualsScore.scala

@@ -1,14 +1,17 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.model
 
 import java.util.Date
+
 import com.winhc.bigdata.calc.DimScoreV2
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.{Row, SparkSession}
+
 import scala.annotation.meta.getter
 import scala.collection.mutable
 
 /**
- * 软件著作权 | 作品著作权 | 专利
+ * 知识产权类得分
+ * 软件著作权 | 作品著作权 | 专利 | 商标 | 网站
  * π
  */
 
@@ -17,7 +20,9 @@ object CompanyIntellectualsScore {
   val tabMapping: Map[String, (String, String, String, String)] =
     Map("ads_company_copyright_reg_list" -> ("1", "publish_time", "资产权益", "软著作权"), //软件著作权
       "ads_company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
-      "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利") //专利
+      "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
+      "ads_company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
+      "ads_company_tm" -> ("6", "app_date", "资产权益", "商标") //商标
     )
 
   def main(args: Array[String]): Unit = {
@@ -115,15 +120,17 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
     val cid = r.getAs[Long]("new_cid").toString
     val cnt1 = r.getAs[Long]("cnt1")
     flag match {
-      case "1" => getSoftCopyRightScore(id, cid, cnt1, kind, prpject)
-      case "2" => getCopyRightScore(id, cid, cnt1, kind, prpject)
+      case "1" => softCopyRightScore(id, cid, cnt1, kind, prpject)
+      case "2" => copyRightScore(id, cid, cnt1, kind, prpject)
       case "3" => ordinaryPatentScore(id, cid, cnt1, kind, prpject)
       case "4" => inventScore(id, cid, cnt1, kind, prpject)
+      case "5" => webSiteScore(id, cid, cnt1, kind, prpject)
+      case "6" => tradeMarkScore(id, cid, cnt1, kind, prpject)
     }
   }
 
   //软著作权
-  def getSoftCopyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+  def softCopyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
     var score = 0f
     val total = 3f
     val extraScore = 0f
@@ -146,7 +153,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
   }
 
   //著作权
-  def getCopyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+  def copyRightScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
     var score = 0f
     val total = 2f
     val extraScore = 0f
@@ -210,4 +217,50 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       score, total, extraScore)
   }
 
+  //网站
+  def webSiteScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 1f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 > 20) {
+      score = 4f
+      ty = "网站域名数量>20,该项分数上加3分"
+    } else if (cnt1 > 0) {
+      score = 1f
+      ty = "有"
+    } else {
+      score = 0f
+      ty = "无"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //商标
+  def tradeMarkScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 4f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 0f
+      ty = "无"
+    } else if (cnt1 <= 2) {
+      score = 2f
+      ty = "授予数量≤2"
+    } else if (cnt1 <= 8) {
+      score = 3f
+      ty = "授予数量>2,≤8"
+    } else if (cnt1 <= 100) {
+      score = 4f
+      ty = "授予数量>8"
+    }  else {
+      score = 9f
+      ty = "商标数量>100,该项分数上加5分"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
 }

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * π
+ * cid转换
+ */
+
+case class CompanyForCidUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    val odsTable = s"ods_$sourceTable"
+    val adsTable = s"ads_$sourceTable"
+    val companyMapping = "company_name_mapping_pro"
+    val ds = BaseUtil.getPartion(odsTable, spark)
+    //table字段
+    val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
+    val disCol = cols
+
+    //替换字段
+    sql(
+      s"""
+         |SELECT  ${columns.mkString(",a.")},
+         |     coalesce(b.new_cid,a.cid) AS new_cid
+         |FROM $odsTable a
+         |LEFT JOIN $companyMapping b
+         |ON   a.cid = b.cid
+         |WHERE a.ds = $ds and a.cid is not null
+         |""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
+      .createOrReplaceTempView(s"t2")
+
+    sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
+
+    //写表
+    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}

+ 15 - 47
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIntellectuals.scala

@@ -1,53 +1,17 @@
-package com.winhc.bigdata.spark.jobs
+package com.winhc.bigdata.spark.utils
 
 import java.util.Date
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+
 import org.apache.spark.sql.SparkSession
+
 import scala.annotation.meta.getter
-import scala.collection.mutable
 
 /**
- * 软件著作权 | 作品著作权 | 专利
  * π
+ * 拆平cids,落表
  */
 
-object CompanyIntellectuals {
-  val tabMapping: Map[String, Seq[String]] =
-    Map("ods_company_copyright_reg" -> Seq("reg_num", "full_name"), //软件著作权
-    "ods_company_copyright_works" -> Seq("reg_num", "name"), //作品著作权
-    "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
-    )
-
-  def main(args: Array[String]): Unit = {
-    val (sourceTable, cols) = valid(args)
-
-    var config = mutable.Map.empty[String, String]
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
-    new CompanyIntellectuals(spark, sourceTable, tabMapping, cols).calc()
-    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
-    spark.stop()
-  }
-
-  def valid(args: Array[String]) = {
-    if (args.length != 1) {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-
-    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable, cols)
-  }
-}
-
-case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
-                                tabMapping: Map[String, Seq[String]], cols: Seq[String])
-  extends LoggingUtils {
+case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
@@ -55,14 +19,15 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
   import org.apache.spark.sql.functions._
 
   def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     val odsTable = s"ods_$sourceTable"
     val adsListTable = s"ads_${sourceTable}_list"
     val adsTable = s"ads_$sourceTable"
-    val companyMapping = "company_name_mapping"
+    val companyMapping = "company_name_mapping_pro"
     val ds = BaseUtil.getPartion(odsTable, spark)
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
-    val disCol = tabMapping.get(odsTable).get
+    val disCol = cols
 
     sql(s"select * from $odsTable where ds = $ds and cids is not null")
       .dropDuplicates(disCol)
@@ -74,7 +39,7 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
     sql(
       s"""
          |SELECT  c.*
-         |        ,coalesce(d.res_cid,c.cid) as new_cid
+         |        ,coalesce(d.new_cid,c.cid) as new_cid
          |FROM    (
          |            SELECT  *
          |                    ,cid
@@ -86,8 +51,6 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
          |""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
       .createOrReplaceTempView(s"t2")
 
-    sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
-
     //聚合新cids
     val df1 = sql(
       s"""
@@ -104,8 +67,13 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
          |""".stripMargin)
 
     df1.createOrReplaceTempView("t3")
+
+    //sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
+    //sql(s"select ${columns.mkString(",")},new_cids from t3").show(10)
+
     //写表
     sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
     sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cids from t3")
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
-}
+}