Kaynağa Gözat

feat: 拉执行人数据支持多参数

许家凯 3 yıl önce
ebeveyn
işleme
572a1c3e8b

+ 7 - 4
src/main/scala/com/winhc/bigdata/spark/ng/pull/PullExecutionPersonJob.scala

@@ -48,7 +48,7 @@ case class PullExecutionPersonJob(s: SparkSession
   }
 
 
-  def calc(ds: String): Unit = {
+  def calc(ds: String, deleted: String): Unit = {
     sql(
       s"""
          |INSERT OVERWRITE TABLE $target_tab PARTITION(tn='$tn',ds='$ds')
@@ -70,7 +70,7 @@ case class PullExecutionPersonJob(s: SparkSession
          |                                )
          |                    )
          |            WHERE   num = 1
-         |            AND     deleted = 0
+         |            AND     deleted = $deleted
          |        )
          |GROUP BY ${fieldMap(tn)}
          |""".stripMargin)
@@ -80,7 +80,10 @@ case class PullExecutionPersonJob(s: SparkSession
 
 object PullExecutionPersonJob {
   def main(args: Array[String]): Unit = {
-    val Array(tn, ds) = args
+    val Array(tn, ds, deleted) = args
+    if (deleted.startsWith("$")) {
+      throw new RuntimeException("deleted value is null: " + deleted)
+    }
     val project = "winhc_ng"
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
@@ -88,7 +91,7 @@ object PullExecutionPersonJob {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    PullExecutionPersonJob(spark, tn).calc(ds)
+    PullExecutionPersonJob(spark, tn).calc(ds, deleted)
     spark.stop()
   }
 }