Sfoglia il codice sorgente

feat: 企业银行帐户维度

许家凯 4 anni fa
parent
commit
5832400e51

+ 123 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/company_account_with_bank.scala

@@ -0,0 +1,123 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/9 16:52
+ */
+case class company_account_with_bank(s: SparkSession,
+                                     project: String
+                                    ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    val inc_ads_tab = s"$project.inc_ads_company_account_with_bank"
+    val inc_ods_tab = s"$project.inc_ods_company_account_with_bank"
+    var inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
+    val inc_ods_last_ds = getLastPartitionsOrElse(inc_ods_tab, null)
+    if (inc_ods_last_ds == null) {
+      println(s"$inc_ods_tab is empty!!!")
+      return
+    }
+    if (inc_ads_last_ds.equals(inc_ods_last_ds)) {
+      println("rerun...")
+      inc_ads_last_ds = getSecondLastPartitionOrElse(inc_ads_tab, "0")
+    }
+    val all_cols = getColumns(inc_ads_tab)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_last_ds')
+         |SELECT  ${all_cols.diff(Seq("ds")).mkString(",")}
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  t2.current_cid AS rowkey
+         |                                ,t2.current_cid as cid
+         |                                ,t1.name
+         |                                ,t1.credit_code
+         |                                ,t1.address
+         |                                ,t1.phone
+         |                                ,t1.bank
+         |                                ,t1.bank_account
+         |                                ,t1.create_time
+         |                                ,t1.update_time
+         |                                ,t1.deleted
+         |                                ,t1.ds
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    $inc_ads_tab
+         |                                    WHERE   ds > 0
+         |                                ) AS t1
+         |                        JOIN    (
+         |                                    SELECT  *
+         |                                    FROM    winhc_eci_dev.inc_ods_company
+         |                                    WHERE   ds > '$inc_ads_last_ds'
+         |                                    AND     current_cid IS NOT NULL
+         |                                ) AS t2
+         |                        ON      t1.cid = t2.cid
+         |                        UNION ALL
+         |                        SELECT  cid AS rowkey
+         |                                ,cid
+         |                                ,name
+         |                                ,credit_code
+         |                                ,address
+         |                                ,phone
+         |                                ,bank
+         |                                ,bank_account
+         |                                ,create_time
+         |                                ,update_time
+         |                                ,deleted
+         |                                ,ds
+         |                        FROM    $inc_ods_tab
+         |                        WHERE   ds > '$inc_ads_last_ds'
+         |                    ) AS t3
+         |        ) AS t4
+         |WHERE   t4.num = 1
+         |""".stripMargin)
+    import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    $inc_ads_tab
+         |WHERE   ds = '$inc_ods_last_ds'
+         |""".stripMargin)
+      .save2HBase("COMPANY_ACCOUNT_WITH_BANK", "rowkey", all_cols.diff(Seq("rowkey", "ds")))
+
+    sql(
+      s"""
+         |SELECT  DISTINCT rowkey as cid
+         |        ,1 AS company_account_with_bank
+         |FROM    $inc_ads_tab
+         |WHERE   ds = '$inc_ods_last_ds'
+         |""".stripMargin)
+      .save2HBase("COMPANY_SUMMARY", "cid", Seq("company_account_with_bank"))
+  }
+}
+
+
+object company_account_with_bank {
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    company_account_with_bank(s = spark, project = project).calc()
+    spark.stop()
+  }
+}