Sfoglia il codice sorgente

feat: 失信被执限高终本拉数据

许家凯 3 anni fa
parent
commit
4682e3a5b5

+ 94 - 0
src/main/scala/com/winhc/bigdata/spark/ng/pull/PullExecutionPersonJob.scala

@@ -0,0 +1,94 @@
+package com.winhc.bigdata.spark.ng.pull
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/24 11:40
+ */
+
+case class PullExecutionPersonJob(s: SparkSession
+                                  , tn: String
+                                 ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_tab = "winhc_ng.eci_execution_person_data"
+
+
+  private val fieldMap = Map(
+    "company_dishonest_info" -> "name"
+    , "company_zxr" -> "name"
+    , "company_zxr_final_case" -> "name"
+    , "company_zxr_restrict" -> "person_name"
+  )
+
+  init()
+
+  private def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    name STRING COMMENT 'name'
+         |    ,total BIGINT COMMENT 'total'
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY
+         |(
+         |    tn STRING COMMENT '分区'
+         |    ,ds STRING COMMENT '分区'
+         |)
+         |LIFECYCLE 15
+         |""".stripMargin)
+  }
+
+
+  def calc(ds: String): Unit = {
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(tn='$tn',ds='$ds')
+         |SELECT  ${fieldMap(tn)} as name
+         |        ,COUNT(1) AS total
+         |FROM    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    winhc_ng.ads_$tn
+         |                                    WHERE   ds > 0
+         |                                    UNION ALL
+         |                                    SELECT  *
+         |                                    FROM    winhc_ng.inc_ads_$tn
+         |                                    WHERE   ds > 0
+         |                                )
+         |                    )
+         |            WHERE   num = 1
+         |            AND     deleted = 0
+         |        )
+         |GROUP BY ${fieldMap(tn)}
+         |""".stripMargin)
+  }
+}
+
+
+object PullExecutionPersonJob {
+  def main(args: Array[String]): Unit = {
+    val Array(tn, ds) = args
+    val project = "winhc_ng"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    PullExecutionPersonJob(spark, tn).calc(ds)
+    spark.stop()
+  }
+}