Browse Source

feat: 摘要增强版

许家凯 4 years ago
parent
commit
840dd03f9e
1 changed files with 229 additions and 0 deletions
  1. 229 0
      src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

+ 229 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

@@ -0,0 +1,229 @@
+package com.winhc.bigdata.spark.utils
+
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/10/10 10:23
+ * @Description: 摘要增强版
+ */
+case class GroupByInfo(field: String, value_alias: Seq[(String, String)])
+
+case class CompanySummaryPro(s: SparkSession,
+                             project: String, //表所在工程名
+                             tableName: String, //表名(不加前辍)
+                             cidField: String, // 公司id fieldName,例如:split(rowkey,'_')
+                             groupByInfo: GroupByInfo = null, //group的其它条件
+                             where: String = "", //where条件,例如:deleted = 0
+                             sortField: String = "ds"
+                            ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val f_bytes: Array[Byte] = Bytes.toBytes("F")
+  private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
+  val ads_table = s"${project}.ads_$tableName" //存量ads表
+  val inc_ads_table = s"${project}.inc_ads_$tableName"
+
+  val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
+
+  private def create_or_replace_summary(target_tab: String): Unit = {
+    val ddl = groupByInfo == null match {
+      case true => s"$tableName BIGINT"
+      case false => groupByInfo.value_alias.map(r => {
+        s"${r._2} BIGINT"
+      }).mkString(",")
+    }
+    val d =
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    cid BIGINT
+         |    ,${ddl}
+         |)
+         |COMMENT 'summary tmp,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+         |""".stripMargin
+
+    if (spark.catalog.tableExists(target_tab)) {
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS $target_tab
+           |""".stripMargin)
+    }
+
+    sql(d)
+  }
+
+  def calc(is_inc: Boolean = true, target_tab: String = ""): Unit = {
+    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+    val wh = StringUtils.isEmpty(where) match {
+      case true => s""
+      case false => s"AND   $where"
+    }
+    val tmp_tab = "all_data_summary_tmp"
+
+    is_inc match {
+      case true => {
+        sql(
+          s"""
+             |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
+             |FROM    (
+             |            SELECT  DISTINCT $cidField as xjk_cid
+             |            FROM    $inc_ads_table
+             |            WHERE   ds > $ads_last_ds
+             |        ) id_table
+             |JOIN (
+             |              SELECT  *
+             |                      ,$cidField as xjk_cid
+             |              FROM    $ads_table
+             |              WHERE   ds = '$ads_last_ds'
+             |          ) org_tab
+             |ON      id_table.xjk_cid = org_tab.xjk_cid
+             |UNION ALL
+             |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+             |FROM    $inc_ads_table
+             |WHERE   ds > $ads_last_ds
+             |""".stripMargin)
+          .createOrReplaceTempView(tmp_tab)
+      }
+      case false => {
+        sql(
+          s"""
+             |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+             |FROM    $ads_table
+             |WHERE   ds = '$ads_last_ds'
+             |UNION ALL
+             |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+             |FROM    $inc_ads_table
+             |WHERE   ds > $ads_last_ds
+             |""".stripMargin)
+          .createOrReplaceTempView(tmp_tab)
+
+      }
+    }
+
+    val distinct_tab = s"${tmp_tab}_distinct"
+    sql(
+      s"""
+         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+         |FROM    (
+         |            SELECT  tmp.*
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $sortField DESC ) c
+         |            FROM    $tmp_tab AS tmp
+         |        ) tmp2
+         |WHERE   tmp2.c = 1
+         |$wh
+         |""".stripMargin)
+      .createOrReplaceTempView(distinct_tab)
+
+
+    val view = groupByInfo == null match {
+      case true => s"arr[0] as $tableName"
+      case false => for (i <- 0 to groupByInfo.value_alias.length) {
+        s"arr[$i] as ${groupByInfo.value_alias(i)._2}"
+      }.mkString(",")
+    }
+
+    //注册函数
+    if (groupByInfo != null) {
+      val fieldSeq = groupByInfo.value_alias.map(r => {
+        (s"${r._1}", r._2)
+      })
+
+      def getResArr(cid: Long, group_val: String, num: Long): Seq[Long] = {
+        val res = scala.collection.mutable.ArrayBuffer[Long]()
+        for (i <- fieldSeq) {
+          if (i._1.equals(group_val)) {
+            res += num
+          } else {
+            res += 0
+          }
+        }
+        res.toSeq
+      }
+
+      spark.udf.register("xjk_func", getResArr _)
+    }
+
+    val groupKey_show = groupByInfo == null match {
+      case true => s",array(count(1)) as arr"
+      case false => s",xjk_func(cast(${groupByInfo.field} as STRING),count(1)) as arr"
+    }
+
+    val groupKey = groupByInfo == null match {
+      case true => s""
+      case false => s",${groupByInfo.field}"
+    }
+
+    sql(
+      s"""
+         |SELECT  cid
+         |        ,${view}
+         |FROM    (
+         |        SELECT  $cidField as cid
+         |                $groupKey_show
+         |        FROM    $distinct_tab
+         |        GROUP BY $cidField ${groupKey}
+         |)
+         |""".stripMargin)
+      .createOrReplaceTempView("summary_tab")
+
+    if (StringUtils.isEmpty(target_tab)) {
+      /*import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      val writF = getColumns("summary_tab").diff(Seq("cid"))
+      sql(
+        s"""
+           |SELECT *
+           |FROM
+           |    summary_tab
+           |""".stripMargin)
+        .save2HBase("COMPANY_SUMMARY", "cid", writF)*/
+    } else {
+      create_or_replace_summary(target_tab)
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab
+           |SELECT *
+           |FROM
+           |    summary_tab
+           |""".stripMargin)
+    }
+  }
+
+
+  private def getCastCols(name: String, pre: String): String = {
+    val list = List("cid", "new_cid", "ncid")
+    if (list.contains(name)) {
+      return s"CAST(${pre}${name} as BIGINT) $name"
+    }
+    pre + name
+  }
+
+}
+
+object CompanySummaryPro {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    CompanySummaryPro(s = spark
+      , project = "winhc_eci_dev"
+      , tableName = "company_judicial_assistance_list"
+      , cidField = "split(rowkey,'_')[0]"
+      , groupByInfo = null
+      , where = "deleted = 0"
+      , sortField = null)
+      .calc(is_inc = false, target_tab = "winhc_eci_dev.xjk_tmp_summary")
+
+    spark.stop()
+  }
+}