|
@@ -0,0 +1,104 @@
|
|
|
+package com.winhc.bigdata.spark.utils
|
|
|
+
|
|
|
+import org.apache.hadoop.hbase.client.Put
|
|
|
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
|
|
+import org.apache.hadoop.hbase.util.Bytes
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/6/22 16:33
|
|
|
+ * @Description:
|
|
|
+ */
|
|
|
+case class CompanyIncSummary(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ tableName: String, //主表名(不加前辍)
|
|
|
+ cidField: String, // 公司id fieldName
|
|
|
+ dupliCols: Seq[String] //去重主键
|
|
|
+ ) extends LoggingUtils {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+ private val f_bytes: Array[Byte] = Bytes.toBytes("F")
|
|
|
+ private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ val ads_table = s"${project}.ads_$tableName" //存量ads表
|
|
|
+ val inc_ads_table = s"${project}.inc_ads_$tableName"
|
|
|
+
|
|
|
+
|
|
|
+ val partition = sql(s"show partitions $ads_table").collect.toList
|
|
|
+ .map(_.getString(0).split("=")(1))
|
|
|
+ .last
|
|
|
+
|
|
|
+
|
|
|
+ val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY_SUMMARY")
|
|
|
+
|
|
|
+ val ads_table_cols = spark.table(ads_table).columns.filter(l => {
|
|
|
+ !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
|
|
|
+ }).toList.sorted
|
|
|
+
|
|
|
+ val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
|
|
|
+ !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
|
|
|
+ }).toList.sorted
|
|
|
+
|
|
|
+ val new_cols = (ads_table_cols ::: inc_ads_table_cols).distinct.sorted
|
|
|
+ if (new_cols.size != inc_ads_table_cols.size || new_cols.size != ads_table_cols.size) {
|
|
|
+ println(ads_table_cols)
|
|
|
+ println(inc_ads_table_cols)
|
|
|
+ println("cols not equals!")
|
|
|
+ sys.exit(-99)
|
|
|
+ }
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |CREATE TABLE IF NOT EXISTS ${project}.xjk_tmp_count_$tableName as
|
|
|
+ |SELECT ${cidField} as cid
|
|
|
+ | ,COUNT(1) as num
|
|
|
+ |FROM (
|
|
|
+ | SELECT tmp.*
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
|
|
|
+ | FROM (
|
|
|
+ | SELECT DISTINCT $cidField as $cidField
|
|
|
+ | FROM $inc_ads_table
|
|
|
+ | WHERE ds > $partition
|
|
|
+ | ) id_table
|
|
|
+ | JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $ads_table
|
|
|
+ | WHERE ds = '$partition'
|
|
|
+ | ) org_tab
|
|
|
+ | ON id_table.$cidField = org_tab.$cidField
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
|
|
|
+ | FROM $inc_ads_table
|
|
|
+ | WHERE ds > $partition
|
|
|
+ | ) AS tmp
|
|
|
+ | ) tmp2
|
|
|
+ |WHERE tmp2.c = 1
|
|
|
+ |GROUP BY $cidField
|
|
|
+ |""".stripMargin)
|
|
|
+// .write.mode("overwrite").saveAsTable(s"${project}.xjk_tmp_count_$tableName")
|
|
|
+ /* .rdd.map(row => {
|
|
|
+ val id = row(0).asInstanceOf[String]
|
|
|
+ val num = row(1).asInstanceOf[String]
|
|
|
+ val put = new Put(Bytes.toBytes(id))
|
|
|
+ if (!"0".equals(num)) {
|
|
|
+ put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
|
|
|
+ (new ImmutableBytesWritable, put)
|
|
|
+ } else {
|
|
|
+ return null
|
|
|
+ }
|
|
|
+ }).filter(_ != null).saveAsHadoopDataset(jobConf)*/
|
|
|
+ }
|
|
|
+
|
|
|
+ def getCastCols(name: String, pre: String): String = {
|
|
|
+ val list = List("cid", "new_cid", "ncid")
|
|
|
+ if (list.contains(name)) {
|
|
|
+ return s"CAST(${pre}${name} as BIGINT) $name"
|
|
|
+ }
|
|
|
+ pre + name
|
|
|
+ }
|
|
|
+}
|