Переглянути джерело

feat: 添加摘要计算通用程序

许家凯 4 роки тому
батько
коміт
55f0189580

+ 80 - 25
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -1,10 +1,13 @@
 package com.winhc.bigdata.spark.jobs
 
-import com.mongodb.spark.MongoSpark
-import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
 
+import scala.annotation.meta.getter
 import scala.collection.mutable
 
 /**
@@ -14,34 +17,86 @@ import scala.collection.mutable
  */
 object CompanySummaryBySingle extends Logging {
 
-  def main(args: Array[String]): Unit = {
-    if (args.length < 1) {
-      logError("请输入表名!")
-      sys.exit(-1)
-    }
-    val tableName = args(0)
-    val map = mutable.Map[String, String](
-      "spark.mongodb.output.uri" -> "mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw.company_summary?replicaSet=mgset-6501997"
-      , "spark.hadoop.odps.cupid.vpc.domain.list" -> "{\"regionId\":\"cn-shanghai\",\"vpcs\":[{\"vpcId\":\"vpc-11hby9xee\",\"zones\":[{\"urls\":[{\"domain\":\"dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com\",\"port\":3717}]}]}]}"
-    )
 
-    val spark = SparkUtils.InitEnv("CompanySummaryBySingle", map)
-    import spark._
+  case class CompanySummaryBySingleUtil(s: SparkSession,
+                                        project: String
+                                       ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
 
-    var df = sql(getSummarySql(tableName, "company_id"))
-    val lookup = Map(
-      "company_id" -> "_id"
-      , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
-      , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
-    )
+    def all(tableName: String, out: String = null): Unit = {
+      val lastDs = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
 
-    for (elem <- lookup) {
-      if (df.columns.contains(elem._1)) {
-        df = df.withColumnRenamed(elem._1, elem._2)
+      if (out != null) {
+        sql(
+          s"""
+             |CREATE TABLE IF NOT EXISTS winhc_eci_dev.$out
+             |(
+             |    cid  STRING COMMENT 'FIELD'
+             |    ,$tableName BIGINT COMMENT 'FIELD'
+             |)
+             |COMMENT '临时摘要'
+             |""".stripMargin)
+      }
+
+      val cols = Seq("rowkey", "ds")
+      val df = sql(
+        s"""
+           |SELECT  split(rowkey,'_')[0] AS cid
+           |        ,COUNT(1) as ${tableName}
+           |FROM    (
+           |            SELECT  t1.*
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+           |            FROM    (
+           |                        SELECT  ${cols.mkString(",")}
+           |                        FROM    winhc_eci_dev.ads_company_change
+           |                        WHERE   ds = ${lastDs}
+           |                        UNION ALL
+           |                        SELECT  ${cols.mkString(",")}
+           |                        FROM    winhc_eci_dev.inc_ads_company_change
+           |                        WHERE   ds > $lastDs
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |GROUP BY split(rowkey,'_')[0]
+           |""".stripMargin)
+
+      if (out != null) {
+        df.createTempView("xjk_tmp_summary_test")
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$out
+             |SELECT cid,$tableName
+             |FROM
+             |    xjk_tmp_summary_test
+             |""".stripMargin)
+      } else {
+        import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._;
+        df.save2HBase("TEST_COMPANY_SUMMARY", "CID", Seq(tableName.toUpperCase()))
       }
     }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, tableName) = args
+
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+
+    val spark = SparkUtils.InitEnv("CompanySummaryBySingle", config)
+
+    CompanySummaryBySingleUtil(spark, "winhc_eci_dev").all(tableName,out = "xjk_tmp_summ")
 
-    MongoSpark.save(df)
     spark.stop()
   }
 }