Ver código fonte

feat: 企业动态接入并发提job

许家凯 4 anos atrás
pai
commit
462a3d3302

+ 17 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -5,7 +5,7 @@ import java.util.Date
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.StringType
@@ -253,7 +253,21 @@ object CompanyDynamic {
     val cd = CompanyDynamicUtil(spark, project, ds)
     cd.init()
 
-    if (tableNames.equals("all")) {
+    var start = startArgs
+    if (!tableNames.equals("all")) {
+      val set = tableNames.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      cd.calc(e.tableName, e.bName)
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+
+    /*if (tableNames.equals("all")) {
       startArgs.foreach(e => {
         cd.calc(e.tableName, e.bName)
       })
@@ -265,7 +279,7 @@ object CompanyDynamic {
       }).foreach(e => {
         cd.calc(e.tableName, e.bName)
       })
-    }
+    }*/
 
     spark.stop()
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -16,14 +16,14 @@ object AsyncExtract {
 
   def startAndWait(spark: SparkSession, seq: Seq[(String, () => Boolean)]): Unit = {
     start(seq)
-    wait(spark, jobSize = seq.length)
+    wait(spark)
   }
 
   def start(seq: Seq[(String, () => Boolean)]): Unit = {
     AsyncExtract(seq).start()
   }
 
-  def wait(spark: SparkSession, jobSize: Int = 0): Unit = {
+  def wait(spark: SparkSession): Unit = {
     val tracker = spark.sparkContext.statusTracker
 
     val ints = tracker.getJobIdsForGroup(null)