Sfoglia il codice sorgente

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

许家凯 4 anni fa
parent
commit
883b17a97c

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryInc.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.jobs
 
+import com.winhc.bigdata.spark.model.CompanyIntellectualsScore
 import com.winhc.bigdata.spark.utils.{CompanyIncSummary, SparkUtils}
 
 import scala.collection.mutable
@@ -24,6 +25,7 @@ object CompanySummaryInc {
     )
     val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
     CompanyIncSummary(spark, project, tableName, cidField, dupliCols.split(",").seq).calc
+    CompanyIntellectualsScore.start(spark, project, tableName, "inc_tmp_view")
     spark.stop()
   }
 }

+ 73 - 29
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.model
 import java.util.Date
 
 import com.winhc.bigdata.calc.DimScoreV2
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
@@ -18,17 +18,17 @@ import scala.collection.mutable
 object CompanyIntellectualsScore {
 
   val tabMapping: Map[String, (String, String, String, String)] =
-    Map("ads_company_copyright_reg_list" -> ("1", "publish_time", "资产权益", "软著作权"), //软件著作权
-      "ads_company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
-      "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
-      "ads_company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
-      "ads_company_tm" -> ("6", "app_date", "资产权益", "商标"), //商标
-      "ads_company_land_announcement" -> ("7", "commit_time", "资产权益", "购地信息") //购地信息
+    Map("company_copyright_reg_list" -> ("1", "publish_time", "资产权益", "软著作权"), //软件著作权
+      "company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
+      "company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
+      "company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
+      "company_tm" -> ("6", "app_date", "资产权益", "商标"), //商标
+      "company_land_announcement" -> ("7", "commit_time", "资产权益", "购地信息") //购地信息
     )
 
   def main(args: Array[String]): Unit = {
 
-    val (sourceTable, flag, time, kind, project) = valid(args)
+    val (sourceTable, flag, time, kind, project, namespace) = valid(args)
 
     var config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
@@ -42,11 +42,12 @@ object CompanyIntellectualsScore {
 
     //专利分成两部分
     if (flag.contains(";")) {
+
       flag.split(";").foreach(f => {
-        new CompanyIntellectualsScore(spark, sourceTable, f, time, kind, project).calc()
+        new CompanyIntellectualsScore(spark, sourceTable, "", f, time, kind, project, "0", namespace + ".").calc()
       })
     } else {
-      new CompanyIntellectualsScore(spark, sourceTable, flag, time, kind, project).calc()
+      new CompanyIntellectualsScore(spark, sourceTable, "", flag, time, kind, project, "0", namespace + ".").calc()
     }
 
     println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
@@ -55,24 +56,43 @@ object CompanyIntellectualsScore {
   }
 
   def valid(args: Array[String]) = {
-    if (args.length != 1) {
-      println("请输入要计算的table!!!! ")
+    if (args.length != 2) {
+      println("请输入要计算的 工作空间,table !!!! ")
       sys.exit(-1)
     }
-    val Array(sourceTable) = args
-    println(sourceTable.substring(sourceTable.indexOf(".") + 1))
+    val Array(namespace, sourceTable) = args
 
-    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable.substring(sourceTable.indexOf(".") + 1), ("", "", "", ""))
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
       println("输入表不存在!!!   ")
       sys.exit(-1)
     }
-    (sourceTable, flag, time, kind, project)
+
+    (sourceTable, flag, time, kind, project, namespace)
+  }
+
+  //知识产权模型计算
+  def start(spark: SparkSession, namespace: String, sourceTable: String, tableView: String): Unit = {
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
+    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
+      println("输入模型计算表不存在!!! ")
+      sys.exit(-1)
+    }
+
+    //专利分成两部分
+    if (flag.contains(";")) {
+      flag.split(";").foreach(f => {
+        new CompanyIntellectualsScore(spark, sourceTable, tableView, f, time, kind, project, "1", namespace + ".").calc()
+      })
+    } else {
+      new CompanyIntellectualsScore(spark, sourceTable, tableView, flag, time, kind, project, "1", namespace + ".").calc()
+    }
   }
 }
 
-case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
-                                     flag: String, time: String, kind: String, project: String
+case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, tableView: String = "",
+                                     flag: String, time: String, kind: String, project: String,
+                                     tp: String = "0", namespace: String = ""
                                     ) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
@@ -81,8 +101,21 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
 
   def calc(): Unit = {
     //val targetTable = "ads_company_total_score"
-    val targetTable = sourceTable + "_score"
-    val ds = BaseUtil.getPartion(sourceTable, spark)
+    val adsTable = namespace + "ads_" + sourceTable
+    val incAdsTable = namespace + "inc_ads_" + sourceTable
+    val targetTable = namespace + "ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
 
     var appsql = ""
     var apptab = ""
@@ -104,9 +137,9 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
          |                *
          |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
          |                ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
-         |        FROM    $sourceTable
-         |        WHERE   ds = ${ds}
-         |        AND     new_cid IS NOT NULL ${appsql}
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2} ${appsql}
          |        ) a
          |WHERE   num =1
          |""".stripMargin)
@@ -117,15 +150,26 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"t1_view")
 
-    //    logger.info(
-    //      s"""
-    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
-    //         |${showString(sql(s"select * from t1_view"))}
-    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
-    //           """.stripMargin)
+    logger.info(
+      s"""
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+         |${showString(sql(s"select * from t1_view"))}
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+               """.stripMargin)
 
     sql(s"insert overwrite table ${targetTable}${apptab} " +
       s"partition (ds='${ds}')  select * from t1_view")
+
+    val dataFrame = sql(
+      s"""
+         |select
+         |CONCAT_WS('_',cid,id) AS rowkey,
+         |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+         |from t1_view
+         |""".stripMargin)
+
+    //同步hbase
+    Maxcomputer2Hbase(dataFrame,"COMPANY_SCORE").syn()
   }
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/utils/Maxcomputer2Hbase.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame}
+
+import scala.collection.mutable
+
+/**
+ * @Description: df导出到hbase
+ * @author π
+ * @date 2020/6/2911:18
+ */
+case class Maxcomputer2Hbase(dataFrame: DataFrame
+                             , hbaseTable: String) extends Logging {
+  lazy val f_bytes: Array[Byte] = Bytes.toBytes("F")
+
+  def syn(): Unit = {
+    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseTable.toUpperCase)
+    import org.apache.spark.sql.functions.col
+    //df字段转化string
+    val columns: Array[String] = dataFrame.columns
+    val df2: DataFrame = dataFrame.select(columns.map(column => col(column).cast("string")): _*)
+    df2.printSchema()
+    df2.rdd.map(row => {
+      try {
+        val rowkey = row.getAs[String]("rowkey")
+        val put = new Put(Bytes.toBytes(rowkey))
+        columns.filter(_ != "rowkey") map (c => {
+          val value = row.getAs[String](c)
+          put.addColumn(f_bytes, Bytes.toBytes(c), Bytes.toBytes(value))
+        })
+        (new ImmutableBytesWritable, put)
+      } catch {
+        case e: Exception => {
+          logWarning(row.toString())
+          logError(e.getMessage, e)
+          null
+        }
+      }
+    }).filter(_ != null).saveAsHadoopDataset(jobConf)
+
+  }
+
+}
+
+object Maxcomputer2Hbase {
+  def main(args: Array[String]): Unit = {
+
+    val map = mutable.Map[String, String]()
+    val spark = SparkUtils.InitEnv("Maxcomputer2Hbase", map)
+    import spark._
+    val df1 = spark.createDataFrame(Seq(("1", "xx", 5), ("2", "yy", 3))).toDF("rowkey", "name", "age")
+    df1.printSchema()
+    df1.show(10)
+    Maxcomputer2Hbase(df1, "TEST_HBASE").syn()
+    spark.stop()
+  }
+}