|
@@ -1,53 +1,17 @@
|
|
|
-package com.winhc.bigdata.spark.jobs
|
|
|
+package com.winhc.bigdata.spark.utils
|
|
|
|
|
|
import java.util.Date
|
|
|
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
import scala.annotation.meta.getter
|
|
|
-import scala.collection.mutable
|
|
|
|
|
|
/**
|
|
|
- * 软件著作权 | 作品著作权 | 专利
|
|
|
* π
|
|
|
+ * 拆平cids,落表
|
|
|
*/
|
|
|
|
|
|
-object CompanyIntellectuals {
|
|
|
- val tabMapping: Map[String, Seq[String]] =
|
|
|
- Map("ods_company_copyright_reg" -> Seq("reg_num", "full_name"), //软件著作权
|
|
|
- "ods_company_copyright_works" -> Seq("reg_num", "name"), //作品著作权
|
|
|
- "ods_company_patent" -> Seq("pub_number", "title") //作品著作权
|
|
|
- )
|
|
|
-
|
|
|
- def main(args: Array[String]): Unit = {
|
|
|
- val (sourceTable, cols) = valid(args)
|
|
|
-
|
|
|
- var config = mutable.Map.empty[String, String]
|
|
|
- val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
- println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
- new CompanyIntellectuals(spark, sourceTable, tabMapping, cols).calc()
|
|
|
- println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
|
|
|
- spark.stop()
|
|
|
- }
|
|
|
-
|
|
|
- def valid(args: Array[String]) = {
|
|
|
- if (args.length != 1) {
|
|
|
- println("请输入要计算的table!!!! ")
|
|
|
- sys.exit(-1)
|
|
|
- }
|
|
|
- val Array(sourceTable) = args
|
|
|
-
|
|
|
- val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
|
|
|
- if (cols.isEmpty) {
|
|
|
- println("输入表不存在,请配置计算规则!!! ")
|
|
|
- sys.exit(-1)
|
|
|
- }
|
|
|
- (sourceTable, cols)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
|
|
|
- tabMapping: Map[String, Seq[String]], cols: Seq[String])
|
|
|
- extends LoggingUtils {
|
|
|
+case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
import spark.implicits._
|
|
@@ -55,14 +19,15 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
|
|
|
import org.apache.spark.sql.functions._
|
|
|
|
|
|
def calc(): Unit = {
|
|
|
+ println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
val odsTable = s"ods_$sourceTable"
|
|
|
val adsListTable = s"ads_${sourceTable}_list"
|
|
|
val adsTable = s"ads_$sourceTable"
|
|
|
- val companyMapping = "company_name_mapping"
|
|
|
+ val companyMapping = "company_name_mapping_pro"
|
|
|
val ds = BaseUtil.getPartion(odsTable, spark)
|
|
|
//table字段
|
|
|
val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
|
|
|
- val disCol = tabMapping.get(odsTable).get
|
|
|
+ val disCol = cols
|
|
|
|
|
|
sql(s"select * from $odsTable where ds = $ds and cids is not null")
|
|
|
.dropDuplicates(disCol)
|
|
@@ -74,7 +39,7 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
|
|
|
sql(
|
|
|
s"""
|
|
|
|SELECT c.*
|
|
|
- | ,coalesce(d.res_cid,c.cid) as new_cid
|
|
|
+ | ,coalesce(d.new_cid,c.cid) as new_cid
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
| ,cid
|
|
@@ -86,8 +51,6 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
|
|
|
|""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
|
|
|
.createOrReplaceTempView(s"t2")
|
|
|
|
|
|
- sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
|
|
|
-
|
|
|
//聚合新cids
|
|
|
val df1 = sql(
|
|
|
s"""
|
|
@@ -104,8 +67,13 @@ case class CompanyIntellectuals(s: SparkSession, sourceTable: String,
|
|
|
|""".stripMargin)
|
|
|
|
|
|
df1.createOrReplaceTempView("t3")
|
|
|
+
|
|
|
+ //sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
|
|
|
+ //sql(s"select ${columns.mkString(",")},new_cids from t3").show(10)
|
|
|
+
|
|
|
//写表
|
|
|
sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
|
|
|
sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cids from t3")
|
|
|
+ println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
|
|
|
}
|
|
|
-}
|
|
|
+}
|